package xiaoa.java.es.client.impl;

import xiaoa.java.es.bean.IGroupResult;
import xiaoa.java.es.bean.PaginationSupport;
import xiaoa.java.es.bean.Params;
import xiaoa.java.es.client.EsClient;
import xiaoa.java.es.client.EsServerConfig;
import xiaoa.java.es.utils.EsSearchUtils;
import xiaoa.java.log.L;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.search.Query;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHits;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import com.alibaba.fastjson.JSON;

public class EsClientImpl extends EsClient {

	
	@Override
	public  <R> PaginationSupport<R>  searchList(Params params,int page,int pageSize , Class<R> resultCla)throws Exception{
		
		// 查询query
		Query  query = EsSearchUtils.createLuceneQuery(params);

		// es 搜索引擎
		int searchEngineType = params.getSearchEngineType();
		TransportClient client =  getClient(searchEngineType);
		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
		       .setTypes(EsSearchUtils.getType(params));
		
		request.setQuery(new QueryStringQueryBuilder(query.toString()))
		       .setFrom((int)((page - 1) * pageSize))
		       .setSize((int)pageSize);
		

		// 打印query日志
		L.info(params.getTransactionId(), " normal", request.toString());
		
		SearchResponse  response  = request.execute().actionGet(TimeUnit.MINUTES.toMillis(5));
		
		// 检索到的总数
		long total = response.getHits().getTotalHits();
		List<R> iccList = new ArrayList<R>();
		
		for (SearchHit entry : response.getHits().hits()) {
			
			String json = JSON.toJSONString(entry.getSource());
			
			R  r= JSON.parseObject(json, resultCla);
			iccList.add(r);
		}
		
		return new PaginationSupport<R>(iccList, total);
	}

	@Override
	public  <R> PaginationSupport<R>  searchGroup(Params params,int page,int pageSize , Class<R> resultCla)throws Exception{

		// 查询query
		Query  query = EsSearchUtils.createLuceneQuery(params);
		
		
		// 聚合字段
		String groupName = GROUPNAME;
		
		if (params.getStatField() != null && !params.getStatField().isEmpty()){
			groupName = params.getStatField();
		}
		
		// 拼接合并·query
		TermsAggregationBuilder mb = AggregationBuilders.terms(PARENTARRNAME).field(groupName).size(page * pageSize)
				.subAggregation(AggregationBuilders.topHits(CHILDARRNAME)
				.size(1).from(0));;
		
		
		// es 搜索引擎
		int searchEngineType = params.getSearchEngineType();
		
		TransportClient client =  getClient(searchEngineType);
		
		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
		       .setTypes(EsSearchUtils.getType(params));
		
		// 设置排序
		request.setQuery(new QueryStringQueryBuilder(query.toString()))
		       .addAggregation(mb)
		       .setFrom((int)((page - 1) * pageSize))
		       .setSize((int)pageSize);
		

		// 打印query日志
		infoQuery(params.getTransactionId()+ ":normal:" + request.toString());

		SearchResponse  response  = request.execute().actionGet(TimeUnit.MINUTES.toMillis(5));
		
		if (isDebug()){
			LinkedHashMap<String, Object>   logMap = new LinkedHashMap<>();
			logMap.put("response", response.toString());
			debug("searchGroup", logMap);
		}
		
		// 检索到的总数
		long total = response.getHits().getTotalHits();
		Terms agg = response.getAggregations().get(PARENTARRNAME);
		List<? extends Bucket> blst = agg.getBuckets();
		List<R> iccList = new ArrayList<R>();

		for (int i = (page - 1) * pageSize; i < (page) * pageSize && i < blst.size(); i++) {
			long docCount = blst.get(i).getDocCount(); // Doc count
			TopHits topHits = blst.get(i).getAggregations().get(CHILDARRNAME);
			SearchHit hit = topHits.getHits().getHits()[0];
			hit.getSource().put("repeatNum", docCount);
			
	        String json = JSON.toJSONString(hit.getSource());
			
			R  r= JSON.parseObject(json, resultCla);
			iccList.add(r);
			
		}
		
		return new PaginationSupport<>(iccList, total);
	}
	
	

	@Override
	public List<IGroupResult> stat(Params params, int statCount,int minCount) throws Exception {
		
		// 查询query
		Query  query = EsSearchUtils.createLuceneQuery(params);
		// 统计quer
        TermsAggregationBuilder tm = AggregationBuilders.terms("top").field(params.getStatField()).size((int)statCount);
        
        // 设置es
		int searchEngineType = params.getSearchEngineType();
		TransportClient client =  getClient(searchEngineType);
		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // SearchType.DFS_QUERY_THEN_FETCH
		       .setTypes(EsSearchUtils.getType(params));
		
		// 设置uery
		if (query != null){
			request.setQuery(new QueryStringQueryBuilder(query.toString()));
		}
		
		// 设置统计
		request.addAggregation(tm).setFrom(0).setSize((int)0);
		
		// 打印query日志
		infoQuery(params.getTransactionId()+ ":normal:" + request.toString());
		
		SearchResponse  response  = request.execute().actionGet(TimeUnit.MINUTES.toMillis(5));
		
		List<IGroupResult> list = new ArrayList<IGroupResult>();
		Terms agg = response.getAggregations().get("top");
		for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bk : agg.getBuckets()) {
			IGroupResult ir = new IGroupResult();
			ir.setGroupValue(bk.getKey().toString());
			ir.setTotal(Integer.parseInt(bk.getDocCount() + ""));
			list.add(ir);
		}
	
		
		return list;
	}


	@Override
	public long searchTotal(Params params) throws Exception {
		
		// 查询query
		Query  query = EsSearchUtils.createLuceneQuery(params);
		
		// 打印query日志
		L.info(params.getTransactionId(), " normal", query.toString());
		
        // 设置es
		int searchEngineType = params.getSearchEngineType();
		TransportClient client =  getClient(searchEngineType);
		
		SearchResponse  response =  client.prepareSearch(getSearchIndex(params))
		                                .setTypes(EsSearchUtils.getType(params))
		                                .setQuery(new QueryStringQueryBuilder(query.toString()))
		                                .execute()
		                                .actionGet(TimeUnit.MINUTES.toMillis(5));
		
		return response.getHits().getTotalHits();
	}

//	@Override
//	public Date getLastDateIndexTime(Params params) throws Exception {
//		
//       // 设置es
//		int searchEngineType = params.getSearchEngineType();
//		TransportClient client =  getClient(searchEngineType);
//		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
//		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
//		       .setTypes(EsSearchUtils.getType(params));
//			
//		
//		String indexTimeField = EsSearchUtils.getIndexField(params);
//		
//		request.storedFields(indexTimeField).addSort(indexTimeField, SortOrder.DESC);
//		request.setFrom(0).setSize(1);
//		
//		SearchResponse  response  = request.execute().actionGet(SystemConstants.ES_SEARCH_MAX_TIME);
//		
//		for (SearchHit entry : response.getHits().getHits()) {
//			String date = entry.getFields().get(indexTimeField).getValue().toString();
//			return DateUtilsIis.parse(date);
//		}
//			
//		return null;
//	}
//
//
//
//	@SuppressWarnings("unchecked")
//	@Override
//	public <T> List<T> searchByEsForCursorGetData(Params params, long num, Class<T> returnClass)
//			throws Exception {
//		
//		// 类型   避免多次 equals
//		int returnType = 0;
//		
//		if (returnClass.equals(String.class)){
//			returnType = 1;
//		}else if (returnClass.equals(IContentCommonNet.class)){
//			returnType = 2;
//		}else{
//			throw new RuntimeException(" un support " + returnClass);
//		}
//		
//		// 查询query
//		QueryBuilder  query = ParamsUtils.createEsqueryBuilderForHight(params);
//		
//       // 设置es
//		int searchEngineType = params.getSearchEngineType();
//		TransportClient client =  getClient(searchEngineType);
//		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
//		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
//		       .setTypes(EsSearchUtils.getType(params));
//		
//		
//		// 每次拉取数量
//		int size = (int)num < SystemConstants.ES_CURSOR_PAGE_SIZE && num > 0 ? (int)num  : SystemConstants.ES_CURSOR_PAGE_SIZE ;
//		
//		
//		// 过程开始时间
//		long processStartTime = System.currentTimeMillis();
//
//		// 单步开始时间
//		long startTime = System.currentTimeMillis();
//		
//		// 执行搜索
//		SearchResponse response = request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC).setScroll(new TimeValue(SystemConstants.ES_CURSOR_PAGE_SIZE * 40))
//		.setQuery(query).setSize(size).execute().actionGet(SystemConstants.ES_CURSOR_PAGE_SIZE * 40);
//		
//		String scrollId = response.getScrollId();
//		String tid = params.getTransactionId();
//		
//		System.out.println(tid + " cursor list total size is [" + response.getHits().getHits().length + "] , cursor server total used time " + (System.currentTimeMillis() - startTime));
//		
//		
//		// 返回结果
//		List<T> list = new ArrayList<T>();
//		
//		
//
//		while (true && num != list.size()) {
//			for (SearchHit hit : response.getHits().getHits()) {
//				
//				// 判断数据是否满足
//				if (num == list.size()){
//					break;
//				}
//				
//				// 如果是string
//				if (returnType == 1){
//					list.add((T)hit.getId());
//				}else if (returnType == 2){
//					
//					IContentCommonNet  icc = new IContentCommonNet();
//					EsSearchUtils.doFillIContentCommonObj(icc, hit.getSource());
//					list.add((T)icc);
//				}else{
//					throw new RuntimeException(" un support " + returnClass);
//				}
//			}
//			
//			// 如果没有数量了
//			if (response.getHits().getHits().length < size){
//				break;
//			}
//			
//			startTime = System.currentTimeMillis();
//			response = client.prepareSearchScroll(scrollId)
//					   .setScroll(new TimeValue(SystemConstants.ES_CURSOR_PAGE_SIZE * 40))
//					   .execute().actionGet(SystemConstants.ES_CURSOR_PAGE_SIZE * 40);
//			
//			System.out.println(tid + " cursor list total size is [" + response.getHits().getHits().length + "] , cursor server total used time " + (System.currentTimeMillis() - startTime));
//			if (response.getHits().getHits().length == 0) {
//				break;
//			} else {
//				scrollId = response.getScrollId();
//			}
//			
//			
//		}
//		
//		System.out.println(tid + " cursor end list total size is [" + list.size() + "] , cursor server total used time " + (System.currentTimeMillis() - processStartTime));
//		
//		return list;
//	}
//
//	@Override
//	public <T> List<T> searchByEsForCursorGetData(Params params, Class<T> returnClass)
//			throws Exception {
//		return searchByEsForCursorGetData(params, -1, returnClass);
//	}
//
//
	@SuppressWarnings("resource")
	@Override
	protected TransportClient createClient(EsServerConfig config) throws Exception {
		
		Settings settings = Settings.builder().put("client.transport.sniff", false).put("cluster.name", config.getClusterName()).build();
		return  new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(config.getHost()), config.getPort()));
		
	}
//	
//	
//	@Override
//	public boolean updateDataForEs(Params params, String id, XContentBuilder dataBuilder)
//			throws Exception {
//		
//		 // 设置es
//		int searchEngineType = params.getSearchEngineType();
//		
//		TransportClient client =  getClient(searchEngineType);
//		IndexRequestBuilder  request =  client.prepareIndex(getSearchIndex(params)[0], EsSearchUtils.getType(params) , id);
//				
//		 request.setSource(dataBuilder)
//		.execute().actionGet(SystemConstants.ES_WRITE_MAX_TIME);
//		
//		return true;
//	}
//
//
//	@Override
//	public boolean saveDataForEs(Params params, String id, XContentBuilder dataBuilder)
//			throws Exception {
//		
//		 // 设置es  
//		int searchEngineType = params.getSearchEngineType();
//		
//		TransportClient client =  getClient(searchEngineType);
//		IndexRequestBuilder  request =  client.prepareIndex(getSearchIndex(params)[0], EsSearchUtils.getType(params) , id);
//				
//		 request.setSource(dataBuilder)
//		.execute().actionGet(SystemConstants.ES_WRITE_MAX_TIME);
//		
//		return true;
//	}
//
//	@Override
//	protected TransportClient createClient(EsServerConfig config) throws Exception {
//		// TODO Auto-generated method stub
//		return null;
//	}
//
//	@Override
//	public <R> PaginationSupport<R> searchGroup(Params params, int page, int pageSize, Class<R> resultCla)
//			throws Exception {
//		// TODO Auto-generated method stub
//		return null;
//	}
//
//	@Override
//	public List<IGroupResult> stat(Params params, int statCount, int minCount) throws Exception {
//		// TODO Auto-generated method stub
//		return null;
//	}

	@Override
	public Date getLastDateIndexTime(Params params) throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public <T> List<T> searchByEsForCursorGetData(Params params, long num, Class<T> returnClass) throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public <T> List<T> searchByEsForCursorGetData(Params params, Class<T> returnClass) throws Exception {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public boolean updateDataForEs(Params params, String id, XContentBuilder dataBuilder) throws Exception {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public boolean saveDataForEs(Params params, String id, XContentBuilder dataBuilder) throws Exception {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public SearchRequestBuilder getSearchRequest(Params params) throws Throwable {
		// 查询query
		Query  query = EsSearchUtils.createLuceneQuery(params);

		// es 搜索引擎
		int searchEngineType = params.getSearchEngineType();
		TransportClient client =  getClient(searchEngineType);
		SearchRequestBuilder  request =  client.prepareSearch(getSearchIndex(params));
		request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
		       .setTypes(EsSearchUtils.getType(params));
		
		request.setQuery(new QueryStringQueryBuilder(query.toString()));

		// 打印query日志
		L.info(params.getTransactionId(), " normal", request.toString());
		
		return request;
	}

	
	
	
	


}
